package org.codehaus.activemq.service.boundedvm;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.util.BoundedPacketQueue;
import org.codehaus.activemq.service.Service;

public class TransientTopicBoundedMessageContainer
  implements Service, Runnable
{
  private SynchronizedBoolean started;
  private BrokerClient client;
  private BoundedPacketQueue queue;
  private Thread worker;
  private CopyOnWriteArrayList subscriptions;
  private Log log;

  public TransientTopicBoundedMessageContainer(BrokerClient client, BoundedPacketQueue queue)
  {
    this.client = client;
    this.queue = queue;
    this.started = new SynchronizedBoolean(false);
    this.subscriptions = new CopyOnWriteArrayList();
    this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer for client: " + client);
  }

  public boolean isInactive()
  {
    return this.subscriptions.isEmpty();
  }

  public void addConsumer(Filter filter, ConsumerInfo info)
  {
    TransientTopicSubscription ts = findMatch(info);
    if (ts == null) {
      ts = new TransientTopicSubscription(filter, info);
      this.subscriptions.add(ts);
    }
  }

  public void removeConsumer(ConsumerInfo info)
  {
    TransientTopicSubscription ts = findMatch(info);
    if (ts != null)
      this.subscriptions.remove(ts);
  }

  public void start()
  {
    if (this.started.commit(false, true)) {
      this.worker = new Thread(this);
      this.worker.setPriority(6);
      this.worker.start();
    }
  }

  public boolean targetAndDispatch(ActiveMQMessage message)
    throws JMSException
  {
    boolean result = false;
    List tmpList = null;
    for (Iterator i = this.subscriptions.iterator(); i.hasNext(); ) {
      TransientTopicSubscription ts = (TransientTopicSubscription)i.next();
      if (ts.isTarget(message)) {
        if (tmpList == null) {
          tmpList = new ArrayList();
        }
        tmpList.add(ts);
      }
    }
    dispatchToQueue(message, tmpList);
    return tmpList != null;
  }

  public void stop()
  {
    this.started.set(false);
    this.queue.clear();
  }

  public void close()
  {
    if (this.started.get()) {
      stop();
    }
    this.queue.close();
  }

  public void run()
  {
    int count = 0;
    while (this.started.get())
      try {
        ActiveMQMessage message = (ActiveMQMessage)this.queue.dequeue(2000L);
        if (message != null) {
          this.client.dispatch(message);
          count++; if (count == 250) {
            count = 0;
            Thread.yield();
          }
        }
      }
      catch (Exception e) {
        stop();
        this.log.warn("stop dispatching", e);
      }
  }

  private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException
  {
    if (list != null) {
      int[] ids = new int[list.size()];
      for (int i = 0; i < list.size(); i++) {
        TransientTopicSubscription ts = (TransientTopicSubscription)list.get(i);
        ids[i] = ts.getConsumerInfo().getConsumerNo();
      }
      message.setConsumerNos(ids);
      try {
        this.queue.enqueue(message);
      }
      catch (InterruptedException e) {
        this.log.warn("queue interuppted, closing", e);
        close();
      }
    }
  }

  private TransientTopicSubscription findMatch(ConsumerInfo info) {
    TransientTopicSubscription result = null;
    for (Iterator i = this.subscriptions.iterator(); i.hasNext(); ) {
      TransientTopicSubscription ts = (TransientTopicSubscription)i.next();
      if (ts.getConsumerInfo().equals(info)) {
        result = ts;
        break;
      }
    }
    return result;
  }
}